home *** CD-ROM | disk | FTP | other *** search
- #! /usr/bin/env python
-
- """RCS Proxy.
-
- Provide a simplified interface on RCS files, locally or remotely.
- The functionality is geared towards implementing some sort of
- remote CVS like utility. It is modeled after the similar module
- FSProxy.
-
- The module defines two classes:
-
- RCSProxyLocal -- used for local access
- RCSProxyServer -- used on the server side of remote access
-
- The corresponding client class, RCSProxyClient, is defined in module
- rcsclient.
-
- The remote classes are instantiated with an IP address and an optional
- verbosity flag.
- """
-
- import server
- import md5
- import os
- import fnmatch
- import string
- import tempfile
- import rcslib
-
-
- class DirSupport:
-
- def __init__(self):
- self._dirstack = []
-
- def __del__(self):
- self._close()
-
- def _close(self):
- while self._dirstack:
- self.back()
-
- def pwd(self):
- return os.getcwd()
-
- def cd(self, name):
- save = os.getcwd()
- os.chdir(name)
- self._dirstack.append(save)
-
- def back(self):
- if not self._dirstack:
- raise os.error, "empty directory stack"
- dir = self._dirstack[-1]
- os.chdir(dir)
- del self._dirstack[-1]
-
- def listsubdirs(self, pat = None):
- files = os.listdir(os.curdir)
- files = filter(os.path.isdir, files)
- return self._filter(files, pat)
-
- def isdir(self, name):
- return os.path.isdir(name)
-
- def mkdir(self, name):
- os.mkdir(name, 0777)
-
- def rmdir(self, name):
- os.rmdir(name)
-
-
- class RCSProxyLocal(rcslib.RCS, DirSupport):
-
- def __init__(self):
- rcslib.RCS.__init__(self)
- DirSupport.__init__(self)
-
- def __del__(self):
- DirSupport.__del__(self)
- rcslib.RCS.__del__(self)
-
- def sumlist(self, list = None):
- return self._list(self.sum, list)
-
- def sumdict(self, list = None):
- return self._dict(self.sum, list)
-
- def sum(self, name_rev):
- f = self._open(name_rev)
- BUFFERSIZE = 1024*8
- sum = md5.new()
- while 1:
- buffer = f.read(BUFFERSIZE)
- if not buffer:
- break
- sum.update(buffer)
- self._closepipe(f)
- return sum.digest()
-
- def get(self, name_rev):
- f = self._open(name_rev)
- data = f.read()
- self._closepipe(f)
- return data
-
- def put(self, name_rev, data, message=None):
- name, rev = self._unmangle(name_rev)
- f = open(name, 'w')
- f.write(data)
- f.close()
- self.checkin(name_rev, message)
- self._remove(name)
-
- def _list(self, function, list = None):
- """INTERNAL: apply FUNCTION to all files in LIST.
-
- Return a list of the results.
-
- The list defaults to all files in the directory if None.
-
- """
- if list is None:
- list = self.listfiles()
- res = []
- for name in list:
- try:
- res.append((name, function(name)))
- except (os.error, IOError):
- res.append((name, None))
- return res
-
- def _dict(self, function, list = None):
- """INTERNAL: apply FUNCTION to all files in LIST.
-
- Return a dictionary mapping files to results.
-
- The list defaults to all files in the directory if None.
-
- """
- if list is None:
- list = self.listfiles()
- dict = {}
- for name in list:
- try:
- dict[name] = function(name)
- except (os.error, IOError):
- pass
- return dict
-
-
- class RCSProxyServer(RCSProxyLocal, server.SecureServer):
-
- def __init__(self, address, verbose = server.VERBOSE):
- RCSProxyLocal.__init__(self)
- server.SecureServer.__init__(self, address, verbose)
-
- def _close(self):
- server.SecureServer._close(self)
- RCSProxyLocal._close(self)
-
- def _serve(self):
- server.SecureServer._serve(self)
- # Retreat into start directory
- while self._dirstack: self.back()
-
-
- def test_server():
- import string
- import sys
- if sys.argv[1:]:
- port = string.atoi(sys.argv[1])
- else:
- port = 4127
- proxy = RCSProxyServer(('', port))
- proxy._serverloop()
-
-
- def test():
- import sys
- if not sys.argv[1:] or sys.argv[1] and sys.argv[1][0] in '0123456789':
- test_server()
- sys.exit(0)
- proxy = RCSProxyLocal()
- what = sys.argv[1]
- if hasattr(proxy, what):
- attr = getattr(proxy, what)
- if callable(attr):
- print apply(attr, tuple(sys.argv[2:]))
- else:
- print `attr`
- else:
- print "%s: no such attribute" % what
- sys.exit(2)
-
-
- if __name__ == '__main__':
- test()
-